%%capture
%load_ext autoreload
%autoreload 2
%matplotlib inline
%load_ext tfl_training_anomaly_detection
%presentation_style
%%capture
%set_random_seed 12
%load_latex_macros
import numpy as np
import itertools as it
from tqdm import tqdm
import matplotlib
from matplotlib import pyplot as plt
import plotly.express as px
import pandas as pd
import ipywidgets as widgets
from tfl_training_anomaly_detection.exercise_tools import evaluate, get_kdd_data, get_house_prices_data, create_distributions, contamination, \
perform_rkde_experiment, get_mnist_data
from ipywidgets import interact
from sklearn.metrics import roc_auc_score, average_precision_score
from sklearn.model_selection import RandomizedSearchCV
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelBinarizer
from sklearn.ensemble import IsolationForest
from sklearn import metrics
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.neighbors import KernelDensity
from tfl_training_anomaly_detection.vae import VAE, build_decoder_mnist, build_encoder_minst, build_contaminated_minst
from tensorflow import keras
%matplotlib inline
matplotlib.rcParams['figure.figsize'] = (5, 5)
Idea: Estimate the density of $F_0$. Areas of low density are anomalous.

Definition:
Definition: Let $D = \{x_1,\ldots,x_N\}\subset \mathbb{R}^p$. The KDE with kernel $K$ and bandwidth $h$ is $KDE_h(x, D) = \frac{1}{N}\sum_{i=1}^N \frac{1}{h^p}K\left(\frac{|x-x_i|}{h}\right)$
![]() |
![]() |
Play with the parameters!
dists = create_distributions(dim=2, dim_irrelevant=0)
sample_train = dists['Double Blob'].sample(500)
X_train = sample_train[-1]
y_train = [0]*len(X_train)
plt.scatter(X_train[:,0], X_train[:,1], c = 'blue', s=10)
plt.show()
# Helper function
def fit_kde(kernel: str, bandwidth: float, X_train: np.array):
""" Fit KDE
@param kernel: kernel
@param bandwidth: bandwidth
@param x_train: data
"""
kde = KernelDensity(kernel=kernel, bandwidth=bandwidth)
kde.fit(X_train)
return kde
def visualize_kde(kde: KernelDensity, bandwidth: float, X_test: np.array, y_test: np.array):
"""Plot KDE
@param kde: KDE
@param bandwidth: bandwidth
@param X_test: test data
@param y_test: test label
"""
fig, axis = plt.subplots(figsize=(5, 5))
lin = np.linspace(-10, 10, 50)
grid_points = list(it.product(lin, lin))
ys, xs = np.meshgrid(lin, lin)
# The score function of sklearn returns log-densities
scores = np.exp(kde.score_samples(grid_points)).reshape(50, 50)
colormesh = axis.contourf(xs, ys, scores)
fig.colorbar(colormesh)
axis.set_title('Density Conturs (Bandwidth={})'.format(bandwidth))
axis.set_aspect('equal')
color = ['blue' if i ==0 else 'red' for i in y_test]
plt.scatter(X_test[:, 0], X_test[:, 1], c=color)
plt.show()
ker = None
bdw = None
@interact(
kernel=['gaussian', 'tophat', 'epanechnikov', 'exponential', 'linear', 'cosine'],
bandwidth=(.1, 10.)
)
def set_kde_params(kernel: str, bandwidth: float):
"""Helper funtion to set widget parameters
@param kernel: kernel
@param bandwidth: bandwidth
"""
global ker, bdw
ker = kernel
bdw = bandwidth
kde = fit_kde(ker, bdw, X_train)
visualize_kde(kde, bdw, X_train, y_train)
The bandwidth is the most important parameter of a KDE model. A wrongly adjusted value will lead to over- or under-smoothing of the density curve.
A common method to select a bandwidth is maximum log-likelihood cross validation. $$h_{\textrm{llcv}} = \arg\max_{h}\frac{1}{k}\sum_{i=1}^k\sum_{y\in D_i}\log\left(\frac{k}{N(k-1)}\sum_{x\in D_{-i}}K_h(x, y)\right)$$ where $D_{-i}$ is the data without the $i$th cross validation fold $D_i$.
ex no.1: Noisy sinusoidal
# Generate example
dists = create_distributions(dim=2)
distribution_with_anomalies = contamination(
nominal=dists['Sinusoidal'],
anomaly=dists['Blob'],
p=0.05
)
# Train data
sample_train = dists['Sinusoidal'].sample(500)
X_train = sample_train[-1].numpy()
# Test data
sample_test = distribution_with_anomalies.sample(500)
X_test = sample_test[-1].numpy()
y_test = sample_test[0].numpy()
scatter = plt.scatter(X_test[:, 0], X_test[:, 1], c=y_test)
handels, _ = scatter.legend_elements()
plt.legend(handels, ['Nominal', 'Anomaly'])
plt.gca().set_aspect('equal')
plt.show()
param_space = {
'kernel': ['gaussian', 'tophat', 'epanechnikov', 'exponential', 'linear', 'cosine'], # Add available kernels
'bandwidth': np.linspace(0.1, 10, 100), # Define Search space for bandwidth parameter
}
def hyperopt_by_score(X_train: np.array, param_space: dict, cv: int=5):
"""Performs hyperoptimization by score
@param X_train: data
@param param_space: parameter space
@param cv: number of cv folds
"""
kde = KernelDensity()
search = RandomizedSearchCV(
estimator=kde,
param_distributions=param_space,
n_iter=100,
cv=cv,
scoring=None # use estimators internal scoring function, i.e. the log-probability of the validation set for KDE
)
search.fit(X_train)
return search.best_params_, search.best_estimator_
Run the code below to perform hyperparameter optimization.
params, kde = hyperopt_by_score(X_train, param_space)
print('Best parameters:')
for key in params:
print('{}: {}'.format(key, params[key]))
test_scores = -kde.score_samples(X_test)
test_scores = np.where(test_scores == np.inf, np.max(test_scores[np.isfinite(test_scores)])+1, test_scores)
curves = evaluate(y_test, test_scores)
visualize_kde(kde, params['bandwidth'], X_test, y_test)
You are a company resposible to estimate house prices around Ames, Iowa, specifically around college area. But there is a problem: houses from a nearby area, 'Veenker', are often included in your dataset. You want to build an anomaly detection algorithm that filters one by one every point that comes from the wrong neighborhood. You have been able to isolate an X_train dataset which, you are sure, contains only houses from College area. Following the previous example, test your ability to isolate anomalies in new incoming data (X_test) with KDE.
Advanced exercise: What happens if the contamination comes from other areas? You can choose among the following names:
OldTown, Veenker, Edwards, MeadowV, Somerst, NPkVill, BrDale, Gilbert, NridgHt, Sawyer, Blmngtn, Blueste
X_train, X_test, y_test = get_house_prices_data(neighborhood = 'CollgCr', anomaly_neighborhood='Veenker')
X_train
# Total data
train_test_data = X_train.append(X_test, ignore_index=True)
y_total = [0] * len(X_train) + y_test
fig = px.scatter_3d(train_test_data, x='LotArea', y='OverallCond', z='SalePrice', color=y_total)
fig.show()
# When data are highly in-homogeneous, like in this case, it is often beneficial
# to rescale them before applying any anomaly detection or clustering technique.
scaler = MinMaxScaler()
X_train_rescaled = scaler.fit_transform(X_train)
param_space = {
'kernel': ['gaussian', 'tophat', 'epanechnikov', 'exponential', 'linear', 'cosine'], # Add available kernels
'bandwidth': np.linspace(0.1, 10, 100), # Define Search space for bandwidth parameter
}
params, kde = hyperopt_by_score(X_train_rescaled, param_space)
print('Best parameters:')
for key in params:
print('{}: {}'.format(key, params[key]))
X_test_rescaled = scaler.transform(X_test)
test_scores = -kde.score_samples(X_test_rescaled)
test_scores = np.where(test_scores == np.inf, np.max(test_scores[np.isfinite(test_scores)])+1, test_scores)
curves = evaluate(y_test, test_scores)
The flexibility of KDE comes at a price. The dependency on the dimensionality of the data is quite unfavorable.
Theorem [Stone, 1982] Any estimator that is consistent$^*$ with the class of all $k$-fold differentiable pdfs over $\mathbb{R}^d$ has a convergence rate of at most
$$ \frac{1}{n^{\frac{k}{2k+d}}} $$$^*$Consistency = for all pdfs $p$ in the class: $\lim_{n\to\infty}|KDE_h(x, D) - p(x)|_\infty = 0$ with probability $1$.
We take a look at a higher dimensional version of out previous data set.
dists = create_distributions(dim=3)
distribution_with_anomalies = contamination(
nominal=dists['Sinusoidal'],
anomaly=dists['Blob'],
p=0.01
)
sample = distribution_with_anomalies.sample(500)
y = sample[0]
X = sample[-1]
fig = px.scatter_3d(x=X[:, 0], y=X[:, 1], z=X[:, 2], color=y)
fig.show()
# Fit KDE on high dimensional examples
rocs = []
auprs = []
bandwidths = []
param_space = {
'kernel': ['gaussian'],
'bandwidth': np.linspace(0.1, 100, 1000), # Define Search space for bandwidth parameter
}
kdes = {}
dims = np.arange(2,16)
for d in tqdm(dims):
# Generate d dimensional distributions
dists = create_distributions(dim=d)
distribution_with_anomalies = contamination(
nominal=dists['Sinusoidal'],
anomaly=dists['Blob'],
p=0
)
# Train on clean data
sample_train = dists['Sinusoidal'].sample(500)
X_train = sample_train[-1].numpy()
# Test data
sample_test = distribution_with_anomalies.sample(500)
X_test = sample_test[-1].numpy()
y_test = sample_test[0].numpy()
# Optimize bandwidth
params, kde = hyperopt_by_score(X_train, param_space)
kdes[d] = (params, kde)
bandwidths.append(params['bandwidth'])
test_scores = -kde.score_samples(X_test)
test_scores = np.where(test_scores == np.inf, np.max(test_scores[np.isfinite(test_scores)])+1, test_scores)
# Plot cross section of pdf
fig, axes = plt.subplots(nrows=2, ncols=7, figsize=(15, 5))
for d, axis in tqdm(list(zip(kdes, axes.flatten()))):
params, kde = kdes[d]
lin = np.linspace(-10, 10, 50)
grid_points = list(it.product(*([[0]]*(d-2)), lin, lin))
ys, xs = np.meshgrid(lin, lin)
# The score function of sklearn returns log-densities
scores = np.exp(kde.score_samples(grid_points)).reshape(50, 50)
colormesh = axis.contourf(xs, ys, scores)
axis.set_title("Dim = {}".format(d))
axis.set_aspect('equal')
# Plot evaluation
print('Crossection of the KDE at (0,...,0, x, y)')
plt.show()
Another drawback of KDE in the context of anomaly detection is that it is not robust against contamination of the data
Definition The breakdown point of an estimator is the smallest fraction of observations that need to be changed so that we can move the estimate arbitrarily far away from the true value.
Example: The sample mean has a breakdown point of $0$. Indeed, for a sample of $x_1,\ldots, x_n$ we only need to change a single value in order to move the sample mean in any way we want. That means that the breakdown point is smaller than $\frac{1}{n}$ for every $n\in\mathbb{N}$.
There are robust replacements for the sample mean:
Switch from quadratic to linear loss at prescribed threshold
import numpy as np
def huber(error: float, threshold: float):
"""Huber loss
@param error: base error
@param threshold: threshold for linear transition
"""
test = (np.abs(error) <= threshold)
return (test * (error**2)/2) + ((1-test)*threshold*(np.abs(error) - threshold/2))
x = np.linspace(-5, 5)
y = huber(x, 1)
plt.plot(x, y)
plt.gca().set_title("Huber Loss")
plt.show()
More complex loss function. Depends on 3 parameters 0 < a < b< r
import numpy as np
def single_point_hampel(error: float, a: float, b: float, r: float):
"""Hampel loss
@param error: base error
@param a: 1st threshold parameter
@param b: 2nd threshold parameter
@param r: 3rd threshold parameter
"""
if abs(error) <= a:
return error**2/2
elif a < abs(error) <= b:
return (1/2 *a**2 + a* (abs(error)-a))
elif b < abs(error) <= r:
return a * (2*b-a+(abs(error)-b) * (1+ (r-abs(error))/(r-b)))/2
else:
return a*(b-a+r)/2
hampel = np.vectorize(single_point_hampel)
x = np.linspace(-10.1, 10.1)
y = hampel(x, a=1.5, b=3.5, r=8)
plt.plot(x, y)
plt.gca().set_title("Hampel Loss")
plt.show()
Kernel as scalar product:

$^*$All kernels that we have seen are radial and monotonic
We compare the performance of different approaches to recover the nominal distribution under contamination. Here, we use code by Humbert et al. to replicate the results in the referenced paper on median-of-mean KDE. More details on rKDE can instead be found in this paper by Kim and Scott.
# =======================================================
# Parameters
# =======================================================
algos = [
'kde',
'mom-kde', # Median-of-Means
'rkde-huber', # robust KDE with huber loss
'rkde-hampel', # robust KDE with hampel loss
]
dataset = 'house-prices'
dataset_options = {'neighborhood': 'CollgCr', 'anomaly_neighborhood': 'Edwards'}
outlierprop_range = [0.01, 0.02, 0.03, 0.05, 0.07, 0.1, 0.2, 0.3, 0.4, 0.5]
kernel = 'gaussian'
auc_scores = perform_rkde_experiment(
algos,
dataset,
dataset_options,
outlierprop_range,
kernel,
)
fig, ax = plt.subplots(figsize=(7, 5))
for algo, algo_data in auc_scores.groupby('algo'):
x = algo_data.groupby('outlier_prop').mean().index
y = algo_data.groupby('outlier_prop').mean()['auc_anomaly']
ax.plot(x, y, 'o-', label=algo)
plt.legend()
plt.xlabel('outlier_prop')
plt.ylabel('auc_score')
plt.title('Comparison of rKDE against contamination')
Try using different neighborhoods for contamination. Which robust KDE algorithm performs better overall? Choose among the following options:
OldTown, Veenker, Edwards, MeadowV, Somerst, NPkVill, BrDale, Gilbert, NridgHt, Sawyer, Blmngtn, Blueste
You can also change the kernel type: gaussian, tophat, epechenikov, exponential, linear or cosine,
Idea: An anomaly should allow "simple" descriptions that distinguish it from the rest of the data.
Moreover, we assume that a short random descriptions will have a significantly larger chance of isolating an anomaly than isolating any nominal point.
Isolation Forest (iForest) implements this idea by generating an ensemble of random decision trees. Each tree is built as follows:
Input: Data set (subsample) $X$, maximal height $h$
Isolation Tree as Partition Diagram
Input: Observation $x$
![]() |
![]() |
![]() |
![]() |
New Rule to Choose Split Test:
New split criterion:
In the final exercise of today you will have to develop an anomaly detection system for network traffic.
A large e-commerce company A is experiencing downtime due to attacks on their infrastructure. You were instructed to develop a system that can detect malicious connections to the infrastructure. It is planned that suspicious clients will be banned.
Another data science team already prepared the connection data of the last year for you. They also separated a test set and manually identified and labeled attacks in that data.
We will work on a version of the classic KDD99 data set.
The KDD Cup '99 dataset was created by processing the tcp dump portions
of the 1998 DARPA Intrusion Detection System (IDS) Evaluation dataset,
created by MIT Lincoln Lab [1]. The artificial data (described on the dataset's
homepage <https://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html>_) was
generated using a closed network and hand-injected attacks to produce a
large number of different types of attack with normal activity in the
background.
========================= ====================================================
Samples total 976158
Dimensionality 41
Features string (str), discrete (int), continuous (float)
Targets str, 'normal.' or name of the anomaly type
Proportion of Anomalies 1%
========================= ====================================================
You will have to develop the system on your own. In particular, you will have to
X_train,X_test,y_test = get_kdd_data()
#
# Add your exploration code
#
X_train = pd.DataFrame(X_train)
X_test = pd.DataFrame(X_test)
# get description
X_train.describe()
# get better description
X_train.drop(columns=[1,2,3]).astype(float).describe()
# Check for NaNs
print("Number of NaNs: {}".format(X_train.isna().sum().sum()))
#
# Add your preperation code here
#
# Encode string features
binarizer = LabelBinarizer()
one_hots = None
one_hots_test = None
for i in [1, 2, 3]:
binarizer.fit(X_train[[i]].astype(str))
if one_hots is None:
one_hots = binarizer.transform(X_train[[i]].astype(str))
one_hots_test = binarizer.transform(X_test[[i]].astype(str))
else:
one_hots = np.concatenate([one_hots, binarizer.transform(X_train[[i]].astype(str))], axis=1)
one_hots_test = np.concatenate([one_hots_test, binarizer.transform(X_test[[i]].astype(str))], axis=1)
X_train.drop(columns=[1,2,3], inplace=True)
X_train_onehot = pd.DataFrame(np.concatenate([X_train.values, one_hots], axis=1))
X_test.drop(columns=[1,2,3], inplace=True)
X_test_onehot = pd.DataFrame(np.concatenate([X_test.values, one_hots_test], axis=1))
# Encode y
y_test_bin = np.where(y_test == b'normal.', 0, 1)
# Remove suspicious data
# This step is not strictly neccessary but can improve performance
suspicious = X_train_onehot.apply(lambda col: (col - col.mean()).abs() > 4 * col.std() if col.std() > 1 else False)
suspicious = suspicious.any(axis=1)# 4 sigma rule
print('filtering {} suspicious data points'.format(suspicious.sum()))
X_train_clean = X_train_onehot[~suspicious]
# TODO: implement proper model selection
iforest = IsolationForest()
iforest.fit(X_train_clean)
# find best threshold
X_test_onehot, X_val_onehot, y_test_bin, y_val_bin = train_test_split(X_test_onehot, y_test_bin, test_size=.5)
y_score = -iforest.score_samples(X_val_onehot).reshape(-1)
#
# Insert evaluation code
#
# calculate scores if any anomaly is present
if np.any(y_val_bin == 1):
eval = evaluate(y_val_bin, y_score)
prec, rec, thr = eval['PR']
f1s = 2 * (prec * rec)/(prec + rec)
threshold = thr[np.argmax(f1s)]
y_score = -iforest.score_samples(X_test_onehot).reshape(-1)
y_pred = np.where(y_score < threshold, 0, 1)
print('Precision: {}'.format(metrics.precision_score(y_test_bin, y_pred)))
print('Recall: {}'.format(metrics.recall_score(y_test_bin, y_pred)))
print('F1: {}'.format(metrics.f1_score(y_test_bin, y_pred)))
Idea: Embed the data into low dimensional space and reconstruct it again. Good embedding of nominal data $\Rightarrow$ high reconstruction error indicates anomaly.
Autoencoder:
Autoencoder Schema
Neural networks are very well suited for finding low dimensional representations of data. Hence they are a popular choice for the encoder and the decoder.
Artificial Neuron with $N$ inputs: $y = \sigma\left(\sum_i^N w_i X_i + b\right)$
![]() |
![]() |
Neural networks combine many artificial neurons into a complex network. These networks are usually organized in layers where the result of each layer is the input for the next layer. Some commonly used layers are:
An important extension of autoencoders that relates the idea to density estimation. More precisely, we define a generative model for our data using latent variables and combine the maximum likelihood estimation of the parameters with a simultaneous posterior estimation of the latents through amortized stochastic variational inference. We use a decoder network to transform the latent variables into the data distribution, and an encoder network to compute the posterior distribution of the latents given the data.
Definition: The model uses an observed variable $X$ (the data) and a latent variable $Z$ (the defining features of $X$). We assume both $P(Z)$ and $P(X\mid Z)$ to be normally distributed. More precisely
where $\mu_\phi$ is a neural network parametrized with $\phi$. We use variational inference to perform posterior inference on $Z$ given $X$. We assume that the distribution $P(Z\mid X)$ to be relatively well approximated by a Gaussian and use the posterior approximation:
$\mu_\psi$ and $\sigma_\psi$ are neural networks parameterized with $\psi$
Given a data set $D$ we minimize the (amortized) Kullback-Leibler divergence between our posterior approximation and the true posterior: \begin{align*} D_{KL}(q(z\mid x),p(z\mid x)) &= E_{x\sim X, z\sim q(Z\mid x)}\left[\log\left(\frac{q(z \mid x)}{p(z \mid X)}\right)\right] \\ &= E_{x\sim X, z\sim q(Z\mid X)}\left[\log\left(\frac{q(z \mid x)}{\frac{p(x \mid z)p(z)}{p(x)}}\right)\right] \\ &= E_{x\sim X, z\sim q(Z\mid x)}\left[\log\left(\frac{q(z \mid x)}{p(x \mid z)p(z)}\right) + \log(p(x))\right] \\ &= E_{x\sim X, z\sim q(Z\mid x)}\left[\log\left(\frac{q(z \mid x)}{p(x \mid z)p(z)}\right)\right] + E_{x\sim X}[\log(p(x))]\\ \end{align*}
Now we can define
\begin{align*} \mathrm{ELBO}(q(z\mid x),p(z\mid x)) &:= E_{x\sim X}[\log(p(x))] - D_{KL}(q(z\mid x),p(z\mid x)) \\ &= -E_{x\sim X, z\sim q(Z\mid x)}\left[\log\left(\frac{q(z \mid x)}{p(x \mid z)p(z)}\right)\right] \end{align*}Note that we can evaluate the expression inside the expectation of the final RHS of the equation and we can obtain unbiased estmates of the expectation via sampling. Let us further try to understand the ELBO as an optimization objective. On one hand, maximizing the ELBO with respect to the parameters in $q$ is equivalent to minimizing the KL divergence between $p$ and $q$. On the other hand, maximizing the ELBO with respect to the parameters in $p$ can be understood as raising a lower bound for the likelihood of the generative model $p(x)$. Hence, the optimization tries to find an encoder and a decoder pair such that it simultaneously provides a good generative explanation of the data and a good approximation of the posterior distribution of the latent variables.
MNIST is one of the most iconic data sets in the history of machine learning. It contains 70000 samples of $28\times 28$ grayscale images of handwritten digits. Because of its moderate complexity and good visualizability it is well suited to study the behavior of machine learning algorithms in higher dimensional spaces.
While originally created for classification (optical character recognition), we can build an anomaly detection data set by corrupting some of the images.
We first need to obtain the MNIST data set and prepare an anomaly detection set from it. Note that the data set is n row vector format. Therefore, we work with $28\times 28 = 784$ dimensional data points.
mnist = get_mnist_data()
data = mnist['data']
print('data.shape: {}'.format(data.shape))
target = mnist['target'].astype(int)
We prepared a function that does the job for us. It corrupts a prescribed portion of the data by introducing a rotation, noise or a blackout of some part of the image.
First, we need to transform the data into image format.
X = data.reshape(-1, 28, 28, 1)/255
We will only corrupt the test set, hence we will perform the train-test split beforehand. We separate a relatively small test set so that we can use as much as possible from the data to obtain high quality representations.
test_size = .1
X_train, X_test, target_train, target_test = train_test_split(X, target, test_size=test_size)
X_test, y_test = build_contaminated_minst(X_test)
# Visualize contamination
anomalies = X_test[y_test != 0]
selection = np.random.choice(len(anomalies), 25)
fig, axes = plt.subplots(nrows=5, ncols=5, figsize=(5, 5))
for img, ax in zip(anomalies[selection], axes.flatten()):
ax.imshow(img, 'gray')
ax.axis('off')
plt.show()
Let us finally train an autoencoder model. We replicate the model given in the Keras documentation and apply it in a synthetic outlier detection scenario based on MNIST.
in the vae package we provide the implementation of the VAE. Please take a look into the source code to see how the minimization of the KL divergence is implemented.
latent_dim = 3
vae = VAE(decoder=build_decoder_mnist(latent_dim=latent_dim), encoder=build_encoder_minst(latent_dim=latent_dim))
## Inspect model architecture
vae.encoder.summary()
## Inspect model architecture
vae.decoder.summary()
# train model
n_epochs = 30
vae.compile(optimizer=keras.optimizers.Adam(learning_rate=.001))
history = vae.fit(X_train, epochs=n_epochs, batch_size=128)
import matplotlib.pyplot as plt
def plot_latent_space(vae: VAE, n: int=10, figsize: float=10):
"""Plot sample images from 2D slices of latent space
@param vae: vae model
@param n: sample nXn images per slice
@param figsize: figure size
"""
for perm in [[0, 1, 2], [1, 2, 0], [2, 1, 0]]:
# display a n*n 2D manifold of digits
digit_size = 28
scale = 1.0
figure = np.zeros((digit_size * n, digit_size * n))
# linearly spaced coordinates corresponding to the 2D plot
# of digit classes in the latent space
grid_x = np.linspace(-scale, scale, n)
grid_y = np.linspace(-scale, scale, n)[::-1]
for i, yi in enumerate(grid_y):
for j, xi in enumerate(grid_x):
z_sample = np.array([[xi, yi, 0]])
z_sample[0] = z_sample[0][perm]
x_decoded = vae.decoder.predict(z_sample)
digit = x_decoded[0].reshape(digit_size, digit_size)
figure[
i * digit_size : (i + 1) * digit_size,
j * digit_size : (j + 1) * digit_size,
] = digit
plt.figure(figsize=(figsize, figsize))
start_range = digit_size // 2
end_range = n * digit_size + start_range
pixel_range = np.arange(start_range, end_range, digit_size)
sample_range_x = np.round(grid_x, 1)
sample_range_y = np.round(grid_y, 1)
plt.xticks(pixel_range, sample_range_x)
plt.yticks(pixel_range, sample_range_y)
plt.xlabel("z[{}]".format(perm[0]))
plt.ylabel("z[{}]".format(perm[1]))
plt.gca().set_title('z[{}] = 0'.format(perm[2]))
plt.imshow(figure, cmap="Greys_r")
plt.show()
plot_latent_space(vae)
# Principal components
pca = PCA()
latents = vae.encoder.predict(X_train)[2]
pca.fit(latents)
kwargs = {'x_{}'.format(i): (-1., 1.) for i in range(latent_dim)}
@widgets.interact(**kwargs)
def explore_latent_space(**kwargs):
"""Widget to explore latent space from given start position
"""
center_img = pca.transform(np.zeros([1,latent_dim]))
latent_rep_pca = center_img + np.array([[kwargs[key] for key in kwargs]])
latent_rep = pca.inverse_transform(latent_rep_pca)
img = vae.decoder(latent_rep).numpy().reshape(28, 28)
fig, ax = plt.subplots()
ax.axis('off')
ax.axis('off')
ax.imshow(img,cmap='gray', vmin=0, vmax=1)
plt.show()
latents = vae.encoder.predict(X_train)[2]
scatter = px.scatter_3d(x=latents[:, 0], y=latents[:, 1], z=latents[:, 2], color=target_train)
scatter.show()
latents = vae.encoder.predict(X_test)[2]
scatter = px.scatter_3d(x=latents[:, 0], y=latents[:, 1], z=latents[:, 2], color=y_test)
scatter.show()
X_test, X_val, y_test, y_val = train_test_split(X_test, y_test)
n_samples = 10
s = np.random.choice(range(len(X_val)), n_samples)
s = X_val[s]
#s = [X_train_img[i] for i in s]
fig, axes = plt.subplots(nrows=2, ncols=n_samples, figsize=(10, 2))
for img, ax_row in zip(s, axes.T):
x = vae.decoder.predict(vae.encoder.predict(img.reshape(1, 28, 28, 1))[2]).reshape(28, 28)
diff = x - img.reshape(28, 28)
error = (diff * diff).sum()
ax_row[0].axis('off')
ax_row[1].axis('off')
ax_row[0].imshow(img,cmap='gray', vmin=0, vmax=1)
ax_row[1].imshow(x, cmap='gray', vmin=0, vmax=1)
ax_row[1].set_title('E={:.1f}'.format(error))
plt.tight_layout()
plt.show()
from sklearn import metrics
y_test_bin = y_test.copy()
y_test_bin[y_test != 0] = 1
y_val_bin = y_val.copy()
y_val_bin[y_val != 0] = 1
# Evaluate
reconstruction = vae.decoder.predict(vae.encoder(X_val)[2])
rerrors = (reconstruction - X_val).reshape(-1, 28*28)
rerrors = (rerrors * rerrors).sum(axis=1)
# Let's calculate scores if any anomaly is present
if np.any(y_val_bin == 1):
eval = evaluate(y_val_bin.astype(int), rerrors.astype(float))
pr, rec, thr = eval['PR']
f1s = (2 * ((pr * rec)[:-1]/(pr + rec)[:-1]))
threshold = thr[np.argmax(f1s)]
print('Optimal threshold: {}'.format(threshold))
reconstruction = vae.decoder.predict(vae.encoder(X_test)[2])
reconstruction_error = (reconstruction - X_test).reshape(-1, 28*28)
reconstruction_error = (reconstruction_error * reconstruction_error).sum(axis=1)
classification = (reconstruction_error > threshold).astype(int)
print('Precision: {}'.format(metrics.precision_score(y_test_bin, classification)))
print('Recall: {}'.format(metrics.recall_score(y_test_bin, classification)))
print('F1: {}'.format(metrics.f1_score(y_test_bin, classification)))
metrics.confusion_matrix(y_test_bin, classification)
else:
reconstruction_error = None
if reconstruction_error is not None:
combined = list(zip(X_test, reconstruction_error))
combined.sort(key = lambda x: x[1])
if reconstruction_error is not None:
n_rows = 10
n_cols = 10
n_samples = n_rows*n_cols
samples = [c[0] for c in combined[-n_samples:]]
fig, axes = plt.subplots(nrows=n_rows, ncols=n_cols, figsize=(2*n_cols, 2*n_rows))
for img, ax in zip(samples, axes.reshape(-1)):
ax.axis('off')
ax.imshow(img.reshape((28,28)), cmap='gray', vmin=0, vmax=1)
plt.show()